package com.cs.rabbittest.demo.topics;

import com.cs.rabbittest.demo.BaseFactory;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Consumer_topics_email {
    /**
     * Email队列名称
     */
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    /**
     * email routing key
     * 队列绑定交换机指定通配符：
     * 统配符规则：
     * 中间以“.”分隔。
     * 符号#可以匹配多个词，符号*可以匹配一个词语。
     */
    private static final String ROUTING_KEY_EMAIL = "inform.#.email.#";
    /**
     * SMS 队列名称
     */
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    /**
     * SMS routing key
     * 队列绑定交换机指定通配符：
     * 统配符规则：
     * 中间以“.”分隔。
     * 符号#可以匹配多个词，符号*可以匹配一个词语。
     */
    private static final String ROUTING_KEY_SMS = "inform.#.sms.#";

    /**
     * 交换机名称
     */
    private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";


    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = BaseFactory.getConnection();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);

        // 声明队列
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);

        // 队列和交换机绑定
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM, ROUTING_KEY_EMAIL);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                //模拟在消费信息
                try {
                    TimeUnit.SECONDS.sleep(2L);
                    System.out.printf("topics消费者收到信息:%s\n",message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 手动应答ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 消费信息
        channel.basicConsume(QUEUE_INFORM_EMAIL,false,consumer);
    }
}
